package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.Constant;
import com.atguigu.utils.HBaseUtil;
import com.atguigu.utils.JedisUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.client.Connection;
import redis.clients.jedis.Jedis;

public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private Connection connection;
    //private ArrayList<Put> puts;
    private Jedis jedis;

    @Override

    public void open(Configuration parameters) throws Exception {
        connection = HBaseUtil.getConnection();
        jedis = JedisUtil.getJedis();
//        puts = new ArrayList<>();
//        new Timer().schedule(new TimerTask() {
//            @Override
//            public void run() {
//                //执行HBase写出操作
//            }
//        },);
    }

    //Value:{"database":"gmall-220623-flink","table":"comment_info","type":"insert","ts":1669162958,"xid":1111,"xoffset":13941,"data":{"id":1595211185799847960,"user_id":119,"nick_name":null,"head_img":null,"sku_id":31,"spu_id":10,"order_id":987,"appraise":"1204","comment_txt":"评论内容：48384811984748167197482849234338563286217912223261","create_time":"2022-08-02 08:22:38","operate_time":null}}
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {

        //获取RowKey列  "id"
        String rowKeyColumn = value.getString("rowKeyColumn");

        //获取数据
        JSONObject data = value.getJSONObject("data");

        //获取维表主键 1595211185799847960
        String rowKey = data.getString(rowKeyColumn);
        data.remove(rowKeyColumn);
        //获取预分区键
        String splitKey = value.getString("splitKey");
        String sinkTable = value.getString("sinkTable");
        String redisKey = "DIM:" + sinkTable + ":" + rowKey;
        if (splitKey != null) {
            rowKey = HBaseUtil.getRowKey(rowKey, splitKey);
        }

        //如果为删除操作,则删除HBase数据
        String type = value.getString("type");
        if ("delete".equals(type)) {
            HBaseUtil.deleteData(connection, Constant.HBASE_NAME_SPACE, sinkTable, rowKey);
            jedis.del(redisKey);
        } else {

            //如果为更新操作,则同步到Redis
            if ("update".equals(type)) {
                jedis.setex(redisKey, Constant.ONE_DAY, data.toJSONString());
            }

            HBaseUtil.putData(connection,
                    Constant.HBASE_NAME_SPACE,
                    sinkTable,
                    rowKey,
                    value.getString("columnFamily"),
                    data);
        }

//        if (puts.size() >= 5) {
//            HBaseUtil.putData();
//        }
    }

    @Override
    public void close() throws Exception {
        connection.close();
        jedis.close();
    }
}
